Pescador demo

This notebook illustrates some of the basic functionality of pescador: a package to facilitate iterative learning from data streams (implemented as python generators).


In [6]:
import pescador

import numpy as np
np.set_printoptions(precision=4)
import sklearn
import sklearn.datasets
import sklearn.linear_model
import sklearn.metrics
import sklearn.model_selection

In [7]:
def batch_sampler(X, Y, batch_size=20, scale = 1e-1):
    '''A gaussian noise generator for data
    
    Parameters
    ----------
    X : ndarray
        features, n_samples by dimensions
        
    Y : ndarray
        labels, n_samples
        
    batch_size : int
        size of the minibatches to generate
        
    scale : float > 0
        scale of the noise to add
        
    Generates
    ---------
    data
        An infinite stream of data dictionaries
        batch = dict(X=X[i], Y=Y[i])
    '''
    
    X = np.atleast_2d(X)
    Y = np.atleast_1d(Y)

    
    n, d = X.shape
    
    while True:
        i = np.random.randint(0, n, size=batch_size)
        
        noise = scale * np.random.randn(batch_size, d)
        
        yield {'X': X[i] + noise, 'Y': Y[i]}

In [8]:
# Load up the iris dataset for the demo
data = sklearn.datasets.load_iris()
X, Y = data.data, data.target
classes = np.unique(Y)

In [9]:
# What does the data stream look like?

# First, we'll wrap the generator function in a Streamer object.
# This is necessary for a few reasons, notably so that we can re-instantiate
# the generator multiple times (eg once per epoch)
batches = pescador.Streamer(batch_sampler, X, Y)

for q in batches(max_iter=3):
    print(q)


{'X': array([[ 4.4258,  3.1112,  1.3129,  0.2198],
       [ 4.9799,  3.5101,  1.364 ,  0.4528],
       [ 6.1828,  2.1968,  4.3321,  1.4157],
       [ 5.0051,  2.3551,  3.2314,  0.9696],
       [ 5.8826,  2.8636,  4.3867,  1.6251],
       [ 5.9263,  2.8091,  4.1766,  1.0761],
       [ 4.562 ,  3.2333,  1.4327,  0.2532],
       [ 7.3517,  2.8017,  6.1302,  2.057 ],
       [ 6.5248,  2.9433,  5.867 ,  2.2307],
       [ 5.6243,  2.7135,  4.267 ,  1.3158],
       [ 4.4369,  3.1829,  1.3174,  0.2398],
       [ 4.9293,  3.0768,  1.4912,  0.1628],
       [ 5.7435,  2.5492,  3.5466,  0.8201],
       [ 5.5816,  4.1874,  1.5755,  0.5114],
       [ 7.3886,  3.5892,  6.0697,  2.6535],
       [ 5.6016,  2.6809,  4.1578,  1.2411],
       [ 6.3535,  3.42  ,  6.1856,  2.5321],
       [ 6.1428,  2.6925,  5.5733,  1.4021],
       [ 4.9861,  3.2372,  1.7598,  0.3836],
       [ 6.4367,  2.3859,  5.8194,  1.7669]]), 'Y': array([0, 0, 1, 1, 1, 1, 0, 2, 2, 1, 0, 0, 1, 0, 2, 1, 2, 2, 0, 2])}
{'X': array([[ 5.4006,  4.1489,  1.5627,  0.2354],
       [ 5.8643,  2.4767,  3.9196,  1.2635],
       [ 7.7248,  2.8787,  6.7759,  1.937 ],
       [ 5.4541,  3.6686,  1.4243,  0.2001],
       [ 7.944 ,  3.7218,  6.2846,  2.0293],
       [ 6.4046,  2.6964,  5.497 ,  2.0121],
       [ 4.907 ,  2.3577,  4.4331,  1.5035],
       [ 5.2309,  3.3794,  1.4426,  0.6711],
       [ 4.9332,  3.1473,  1.6354,  0.2213],
       [ 5.9293,  2.7036,  3.8511,  1.29  ],
       [ 5.7858,  2.4233,  3.9115,  1.037 ],
       [ 4.8445,  3.0735,  1.5496,  0.2076],
       [ 7.0921,  3.7038,  6.0055,  2.4472],
       [ 5.2373,  2.2789,  2.9361,  0.9508],
       [ 5.8564,  2.5985,  3.4276,  0.9927],
       [ 7.7707,  2.7053,  6.6353,  1.9698],
       [ 6.5882,  3.0069,  4.5496,  1.4054],
       [ 7.9983,  3.6455,  6.3562,  2.0261],
       [ 5.6892,  2.7705,  5.0896,  2.3305],
       [ 5.4506,  3.5979,  1.4419,  0.3277]]), 'Y': array([0, 1, 2, 0, 2, 2, 2, 0, 0, 1, 1, 0, 2, 1, 1, 2, 1, 2, 2, 0])}
{'X': array([[ 5.6808,  2.794 ,  5.1394,  1.834 ],
       [ 5.1402,  3.4474,  1.5758,  0.3242],
       [ 6.9582,  3.1497,  4.8439,  1.3205],
       [ 5.1385,  3.4029,  1.7577,  0.7445],
       [ 6.6509,  3.2288,  5.9159,  2.2224],
       [ 5.4502,  3.9176,  1.1924,  0.4798],
       [ 6.1775,  3.0006,  4.9417,  1.8244],
       [ 4.7371,  3.1337,  1.4098,  0.1238],
       [ 5.976 ,  1.9877,  4.0675,  0.937 ],
       [ 4.939 ,  3.5439,  1.0957,  0.3607],
       [ 5.5374,  3.4114,  1.2487,  0.4112],
       [ 6.8033,  3.0698,  5.3864,  2.0485],
       [ 5.1687,  3.5103,  1.2541,  0.3091],
       [ 5.1002,  3.1303,  1.3174,  0.2296],
       [ 7.4721,  2.7944,  6.2278,  1.868 ],
       [ 4.799 ,  3.7621,  1.3563,  0.259 ],
       [ 4.8656,  3.1615,  1.2278,  0.2431],
       [ 6.5452,  3.0624,  5.8322,  2.1631],
       [ 5.5798,  3.7316,  1.5406,  0.1533],
       [ 7.7141,  2.8549,  6.1217,  2.1333]]), 'Y': array([2, 0, 1, 0, 2, 0, 2, 0, 1, 0, 0, 2, 0, 0, 2, 0, 0, 2, 0, 2])}

Benchmarking

We can benchmark our learner's efficiency by running a couple of experiments on the Iris dataset.

Our classifier will be L1-regularized logistic regression.


In [10]:
%%time
ss = sklearn.model_selection.ShuffleSplit(n_splits=2, test_size=0.2)
for train, test in ss.split(np.arange(len(X))):
    
    # Make an SGD learner, nothing fancy here
    classifier = sklearn.linear_model.SGDClassifier(verbose=0, 
                                                    loss='log',
                                                    penalty='l1', 
                                                    n_iter=1)
    
    # Again, build a streamer object
    batches = pescador.Streamer(batch_sampler, X[train], Y[train])

    # And train the model on the stream.
    n_steps = 0
    for batch in batches(max_iter=5e3):
        classifier.partial_fit(batch['X'], batch['Y'], classes=classes)
        
        n_steps += 1
    
    # How's it do on the test set?
    print('Test-set accuracy: {:.3f}'.format(sklearn.metrics.accuracy_score(Y[test], classifier.predict(X[test]))))
    print('# Steps: ', n_steps)


Test-set accuracy: 0.967
# Steps:  5000
Test-set accuracy: 1.000
# Steps:  5000
CPU times: user 8.97 s, sys: 173 ms, total: 9.15 s
Wall time: 9.73 s

Parallelism

It's possible that the learner is more or less efficient than the data generator. If the data generator has higher latency than the learner (SGDClassifier), then this will slow down the learning.

Pescador uses zeromq to parallelize data stream generation, effectively decoupling it from the learner.


In [11]:
%%time
ss = sklearn.model_selection.ShuffleSplit(n_splits=2, test_size=0.2)
for train, test in ss.split(np.arange(len(X))):
    
    # Make an SGD learner, nothing fancy here
    classifier = sklearn.linear_model.SGDClassifier(verbose=0, 
                                                    loss='log',
                                                    penalty='l1', 
                                                    n_iter=1)
    
    # First, turn the data_generator function into a Streamer object
    batches = pescador.Streamer(batch_sampler, X[train], Y[train])
    
    # Then, send this thread to a second process
    zmq_stream = pescador.ZMQStreamer(batches, 5156)
    
    # And train the model on the stream.
    n_steps = 0
    for batch in zmq_stream(max_iter=5e3):
        classifier.partial_fit(batch['X'], batch['Y'], classes=classes)
        
        n_steps += 1
    
    # How's it do on the test set?
    print('Test-set accuracy: {:.3f}'.format(sklearn.metrics.accuracy_score(Y[test], classifier.predict(X[test]))))
    print('# Steps: ', n_steps)


Test-set accuracy: 1.000
# Steps:  5000
Test-set accuracy: 1.000
# Steps:  5000
CPU times: user 8.77 s, sys: 186 ms, total: 8.95 s
Wall time: 9.64 s